001 /*
002 * SPDX-License-Identifier: Apache-2.0
003 *
004 * Copyright 2020-2022 Agorapulse.
005 *
006 * Licensed under the Apache License, Version 2.0 (the "License");
007 * you may not use this file except in compliance with the License.
008 * You may obtain a copy of the License at
009 *
010 * https://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package com.agorapulse.micronaut.bigquery.impl;
019
020 import com.agorapulse.micronaut.bigquery.BigQueryService;
021 import com.agorapulse.micronaut.bigquery.RowResult;
022 import com.google.cloud.bigquery.BigQuery;
023 import com.google.cloud.bigquery.BigQueryException;
024 import com.google.cloud.bigquery.Job;
025 import com.google.cloud.bigquery.JobId;
026 import com.google.cloud.bigquery.JobInfo;
027 import com.google.cloud.bigquery.QueryJobConfiguration;
028 import com.google.cloud.bigquery.QueryParameterValue;
029 import com.google.cloud.bigquery.TableResult;
030 import io.reactivex.Flowable;
031
032 import javax.inject.Singleton;
033 import java.time.Instant;
034 import java.util.Collections;
035 import java.util.LinkedHashMap;
036 import java.util.Map;
037 import java.util.UUID;
038 import java.util.function.Function;
039
040 @Singleton
041 public class DefaultBigQueryService implements BigQueryService {
042
043 private final BigQuery bigquery;
044
045 public DefaultBigQueryService(BigQuery bigQuery) {
046 this.bigquery = bigQuery;
047 }
048
049 @Override
050 public <T> Flowable<T> query(Map<String, ?> namedParameters, String sql, Function<RowResult, T> builder) {
051 QueryJobConfiguration queryConfig = QueryJobConfiguration
052 .newBuilder(checkForNulls(sql, namedParameters))
053 .setUseLegacySql(false)
054 .setNamedParameters(toNamedParameters(namedParameters))
055 .build();
056
057 // Create a job ID
058 JobId jobId = JobId.of(UUID.randomUUID().toString());
059 Job job = bigquery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build());
060
061 try {
062 // Wait for the query to complete.
063 Job completedJob = job.waitFor();
064
065 // Check for errors
066 if (completedJob == null) {
067 throw new IllegalStateException("Job no longer exists");
068 } else if (completedJob.getStatus().getError() != null) {
069 throw new IllegalStateException("Failed to execute sql " + sql + ":" + completedJob.getStatus().getError());
070 }
071
072
073 TableResult result = completedJob.getQueryResults();
074 return Flowable.fromIterable(result.iterateAll()).filter(r -> !r.isEmpty()).map(FieldValueListRowResult::new).map(builder::apply);
075 } catch (InterruptedException | BigQueryException e) {
076 throw new IllegalStateException("Could not execute query: " + sql, e);
077 }
078 }
079
080 @Override
081 public void execute(Map<String, ?> namedParameters, String sql) {
082 QueryJobConfiguration queryConfig = QueryJobConfiguration
083 .newBuilder(checkForNulls(sql, namedParameters))
084 .setUseLegacySql(false)
085 .setNamedParameters(toNamedParameters(namedParameters))
086 .build();
087
088 // Create a job ID
089 JobId jobId = JobId.of(UUID.randomUUID().toString());
090 Job job = bigquery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build());
091
092 try {
093 // Wait for the query to complete.
094 Job completedJob = job.waitFor();
095
096 // Check for errors
097 if (completedJob == null) {
098 throw new IllegalStateException("Job no longer exists");
099 } else if (completedJob.getStatus().getError() != null) {
100 throw new IllegalStateException(completedJob.getStatus().getError().toString());
101 }
102 } catch (InterruptedException | BigQueryException e) {
103 throw new IllegalStateException("Could not execute sql: " + sql, e);
104 }
105 }
106
107 @SuppressWarnings("unchecked")
108 private Map<String, QueryParameterValue> toNamedParameters(Map<String, ?> namedParameters) {
109 if (namedParameters.isEmpty()) {
110 return Collections.emptyMap();
111 }
112
113 final Map<String, QueryParameterValue> result = new LinkedHashMap<>();
114
115 namedParameters.forEach((key, value) -> {
116 Object converted = convertIfNecessary(value);
117 if (converted instanceof Instant) {
118 Instant instant = (Instant) converted;
119 result.put(key, QueryParameterValue.timestamp(instant.getEpochSecond() * 1_000_000 + instant.getNano() / 1000));
120 } else if (converted != null){
121 result.put(key, QueryParameterValue.of(converted, (Class<Object>) converted.getClass()));
122 }
123 });
124
125 return result;
126 }
127
128 private String checkForNulls(String sql, Map<String, ?> namedParameters) {
129 String result = sql;
130 for(Map.Entry<String, ?> entry : namedParameters.entrySet()) {
131 if (entry.getValue() == null) {
132 result = result.replace("@" + entry.getKey(), "null");
133 }
134 }
135 return result;
136 }
137
138 }
|